package com.lyon.dmeo.storage.client.raft.endpoint;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.SerializeUtil;
import com.lyon.demo.protocol.api.core.command.ResponseFuture;
import com.lyon.demo.protocol.api.remoting.RequestProcessorDelegating;
import com.lyon.demo.protocol.api.transport.Transport;
import com.lyon.demo.protocol.api.transport.TransportClient;
import com.lyon.demo.protocol.api.transport.TransportServer;
import com.lyon.demo.protocol.api.transport.config.RemoteClientConfig;
import com.lyon.demo.protocol.netty.command.CommonRemoteCommand;
import com.lyon.demo.protocol.netty.command.RemoteCommand;
import com.lyon.demo.storage.client.api.core.command.request.HeartbeatRequest;
import com.lyon.demo.storage.client.api.core.command.request.PushEntryRequest;
import com.lyon.demo.storage.client.api.core.command.request.RequestOrResponse;
import com.lyon.demo.storage.client.api.core.command.request.VoteRequest;
import com.lyon.demo.storage.client.api.core.command.response.HeartBeatResponse;
import com.lyon.demo.storage.client.api.core.command.response.PushEntryResponse;
import com.lyon.demo.storage.client.api.core.command.response.VoteResponse;
import com.lyon.demo.storage.client.api.core.config.DLedgerConfig;
import com.lyon.demo.storage.common.spi.DefaultSpiLoader;
import com.lyon.demo.storage.common.thread.StartupShutdownAble;
import com.lyon.demo.storage.client.api.core.core.MemberState;
import com.lyon.dmeo.storage.client.raft.DLedgerServer;
import com.lyon.dmeo.storage.client.raft.core.proccessor.DLedgerRpcService;
import com.lyon.dmeo.storage.client.raft.core.proccessor.core.AbstractRequestProcessor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.lyon.dmeo.storage.client.raft.core.RemoteHandlerCodes.*;

/**
 * @author LeeYan9
 * @since 2022-05-10
 */
@SuppressWarnings({"rawtypes", "unchecked", "AlibabaServiceOrDaoClassShouldEndWithImpl"})
@Slf4j
public class RpcRemoteService implements DLedgerRpcService, StartupShutdownAble, RequestProcessorDelegating<RemoteCommand> {

    private final TransportClient transportClient;
    private final TransportServer transportServer;
    private final DLedgerServer dLedgerServer;
    private final MemberState memberState;
    private final AbstractRequestProcessor requestProcessor;
    private final DLedgerConfig dLedgerConfig;
    private final long connectTimeout;
    private final long readTimeoutMills;
    private final Duration readTimeout;

    public RpcRemoteService(DLedgerServer dLedgerServer) {
        this.dLedgerServer = dLedgerServer;
        this.memberState = dLedgerServer.getMemberState();
        this.dLedgerConfig = dLedgerServer.getDLedgerConfig();
        String remoteProtocol = dLedgerConfig.getRemoteProtocol();
        // 远程通信协议
        this.transportClient = DefaultSpiLoader.INSTANCE.loader(TransportClient.class, remoteProtocol);
        this.transportServer = DefaultSpiLoader.INSTANCE.loader(TransportServer.class, remoteProtocol);
        this.requestProcessor = DefaultSpiLoader.INSTANCE.loader(AbstractRequestProcessor.class, remoteProtocol);
        Assert.notNull(requestProcessor, "通信处理器不能为空");
        Assert.notNull(transportClient, "通信客户端不能为空");
        Assert.notNull(transportServer, "请求服务端不能为空");
        this.requestProcessor.setDelegate(this);
        this.connectTimeout = dLedgerConfig.getConnectTimeout();
        this.readTimeoutMills = dLedgerConfig.getReadTimeout();
        this.readTimeout = Duration.ofMillis(readTimeoutMills);

        RemoteClientConfig remoteServerConfig = dLedgerServer.getDLedgerConfig().getRemoteServerConfig();
        remoteServerConfig.setListenPort(memberState.getSelfAddress().getPort());
        transportServer.setConfig(remoteServerConfig);
        transportClient.setConfig(dLedgerServer.getDLedgerConfig().getRemoteClientConfig());

    }

    @Override
    public void startup() {
        transportServer.prepare(null);
        transportServer.start();
        transportClient.prepare(null);
        transportClient.start();
        transportServer.addRequestProcessor(requestProcessor, null);
    }


    @Override
    public void shutdown() {
        this.transportClient.shutdown();
        this.transportServer.shutdown();
    }

    @Override
    @SneakyThrows
    public RemoteCommand processRequest(RemoteCommand command) {
        CompletableFuture future = null;
        Object response = null;
        switch (command.handlerCode()) {
            case HEART_BEAT:
                future = handleHeartBeat(SerializeUtil.deserialize(command.getPayload()));
                break;
            case VOTE:
                future = handleVote(SerializeUtil.deserialize(command.getPayload()));
                break;
            case PUSH:
                future = handlePush(SerializeUtil.deserialize(command.getPayload()));
                break;
            default:
                future = CompletableFuture.failedFuture(new RuntimeException("未找到可处理请求"));
                break;
        }
        response = future.get();
        return CommonRemoteCommand.ofResponse(command, response);
    }


    @Override
    public CompletableFuture<HeartBeatResponse> heartBeat(HeartbeatRequest heartBeatRequest) {
        return doAsyncSend(heartBeatRequest, HEART_BEAT);
    }

    @Override
    public CompletableFuture<PushEntryResponse> push(PushEntryRequest pushEntryRequest) {
        return doAsyncSend(pushEntryRequest, PUSH);
    }

    private CompletableFuture doAsyncSend(RequestOrResponse request, Integer handleCode) {
        Assert.notNull(request, "请求命令内容不能为空");
        // 创建数据传输通道
        Transport<RemoteCommand, CommonRemoteCommand> transport =
                transportClient.createIfAbsent(memberState.getPeerIp(request.getRemoteId()), Duration.ofMillis(connectTimeout));
        // 发送异步请求，双工通信
        CompletableFuture future = new CompletableFuture<>();
        transport.asyncSend(CommonRemoteCommand.ofRequest(handleCode, SerializeUtil.serialize(request)), readTimeout,
                responseFuture -> {
                    if (responseFuture.getCause() == null) {
                        future.complete(responseFuture.getData().parseData());
                    } else {
                        future.completeExceptionally(responseFuture.getCause());
                    }
                });
        return future;
    }

    private ResponseFuture<CommonRemoteCommand> doSend(RequestOrResponse request, Integer handleCode) {
        Assert.notNull(request, "请求命令内容不能为空");
        // 创建数据传输通道
        Transport<RemoteCommand, CommonRemoteCommand> transport =
                transportClient.createIfAbsent(memberState.getPeerIp(request.getRemoteId()), Duration.ofMillis(connectTimeout));
        // 发送异步请求，双工通信
        return transport.send(CommonRemoteCommand.ofRequest(handleCode, SerializeUtil.serialize(request)), readTimeout);
    }

    @Override
    public CompletableFuture<VoteResponse> vote(VoteRequest voteRequest) {
        return doAsyncSend(voteRequest, VOTE);
    }

    @Override
    public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartbeatRequest heartbeatRequest) {
        return dLedgerServer.handleHeartBeat(heartbeatRequest);
    }

    @Override
    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest entryRequest) {
        return dLedgerServer.handlePush(entryRequest);
    }

    @Override
    public CompletableFuture<VoteResponse> handleVote(VoteRequest voteRequest) {
        return dLedgerServer.handleVote(voteRequest);
    }

    @Override
    public List<Integer> handlerCodes() {
        return List.of(HEART_BEAT, VOTE, PUSH);
    }

}
